
use crate::conf_init::{MapResult, MapErr};
use std::collections::HashMap;
use crossbeam_skiplist_piedb::SkipMap;
use crate::physical_plan::window::window_group::{WindowGroup, TimeOutWindowGroup};
use ouroboros::self_referencing;
use std::ops::{Add,Sub};
use std::marker::PhantomData;
use chrono::{Local, Timelike, Duration};
use crate::timeout::timer_reg_repeat;
use crossbeam_channel::Receiver;
use crate::logical_plan::expr::window::TimeUnit;
use crate::physical_plan::window::accumulator::{PhysicalPlanExprAccumulatorComb};

pub mod window_group;
pub mod accumulator;

/*
单条事件需要具备聚合特征，才能进行后续的聚合操作。
*/
pub trait SortByValue<K,V>{
   fn get_key_value_always(&self, k: &K) ->V;
}

/*
WindowElement以及WindowElement::MapValue都需要具备低clone性能开销
*/
pub trait WindowElement<K,V>: Ord+Send+Clone+'static{
    type MapValue: SortByValue<K,V>+Clone+Send;
    fn index(&self)->WindowIndex;
    fn just_index(&self)->bool{
        false
    }
    fn index_to_value(&self)->Self::MapValue;
    fn index_value_ref(&self)->&Self::MapValue;
    fn index_value_mut(&mut self)->&mut Self::MapValue;
}

#[derive(Eq,PartialEq,Ord,PartialOrd,Clone,Copy,Debug)]
pub struct WindowIndex {
    idx: u128,//ns+uuid
}

impl Sub for WindowIndex{
    type Output = Self;

    fn sub(self, rhs: Self) -> Self::Output{
        WindowIndex{
            idx: self.idx - rhs.idx
        }
    }
}
impl Add for WindowIndex{
    type Output = Self;

    fn add(self, rhs: Self) -> Self::Output{
        WindowIndex{
            idx: self.idx + rhs.idx
        }
    }
}

impl WindowIndex{
    fn new(value: u128)->WindowIndex{
        WindowIndex{
            idx: value
        }
    }
    pub fn new_seque_unique(seque: u64, unique: u64)->WindowIndex{
        WindowIndex::new(((seque as u128) << 64) + unique as u128)
    }
    fn index_slide_window(&self, window_time: &Self)->WindowIndex{
        let tmp = WindowIndex::new((self.idx) | (u64::MAX as u128));
        tmp+*window_time-WindowIndex::new(1<<64)
    }
}
#[derive(Clone,Ord,PartialOrd,Eq,PartialEq)]
enum WindowElementJustIndex{
    Second(u64),
    Micro(u64),
    Mill(u64),
}
impl WindowElement<(),Self> for WindowElementJustIndex{
    type MapValue = WindowElementJustIndex;
    fn index(&self)->WindowIndex{
        let v = match self{
            WindowElementJustIndex::Second(v) => {v},
            WindowElementJustIndex::Micro(v) => {v},
            WindowElementJustIndex::Mill(v) => {v},
        };

        WindowIndex{
            idx: (*v as u128) << 64,
        }
    }
    fn just_index(&self)->bool{
        true
    }
    fn index_to_value(&self)->Self::MapValue{
        self.clone()
    }
    fn index_value_ref(&self)->&Self::MapValue{
        &self
    }
    fn index_value_mut(&mut self)->&mut Self::MapValue{
        self
    }
}

impl <K> SortByValue<K,Self> for WindowElementJustIndex{
    fn get_key_value_always(&self, _k: &K) ->Self{
        self.clone()
    }
}

impl WindowElementJustIndex{
    pub fn new_sec()->Self{
        let now: u64 = Local::now().second() as u64;
        WindowElementJustIndex::Second(now)
    }
    pub fn new_micro()->Self{
        let now: u64 = (Local::now().timestamp_millis() as u64)/1000;
        WindowElementJustIndex::Micro(now)
    }
    pub fn new_mill()->Self{
        let now: u64 = Local::now().timestamp_millis() as u64;
        WindowElementJustIndex::Mill(now)
    }
}

#[self_referencing]
struct WindowCheck<K,V,T: WindowElement<K,V>,Accumulator: PhysicalPlanExprAccumulatorComb<T::MapValue>>{
    first_time: WindowIndex,//当前window的第一个element的time
    last_time: WindowIndex,
    check: bool,
    pair_f: Accumulator,
    skip_map: SkipMap<WindowIndex, T>,
    #[borrows(mut skip_map)]
    wait_window: (Option<WindowIndex>,&'this mut SkipMap<WindowIndex,T>),
    phantom_k: PhantomData<K>,
    phantom_v: PhantomData<V>,
}

impl <K,V,T: WindowElement<K,V>,Accumulator: PhysicalPlanExprAccumulatorComb<T::MapValue>> WindowCheck<K,V,T,Accumulator>{
    fn build_new(pair_f: Accumulator) ->WindowCheck<K,V,T,Accumulator> {
        let we = WindowCheckBuilder {
            first_time: WindowIndex::new(0),
            last_time: WindowIndex::new(0),
            check: false,
            pair_f: pair_f,
            skip_map: SkipMap::new(),
            wait_window_builder: |skip_map: &mut SkipMap<WindowIndex, T>| (None,skip_map),
            phantom_k: PhantomData,
            phantom_v: PhantomData,
        }.build();

        return we;
    }
    async fn add_and_check(&mut self, element: &T)->Result<bool,()>{
        return self.with_mut(|fields| async {
            //fields.pair_f.add_and_check(element.index_to_value()).await
            fields.pair_f.add(element.index_to_value()).await?;
            fields.pair_f.check().await
        }).await;
    }
    fn set_first_time(&mut self, time: &WindowIndex){
        self.with_mut(|fields|{
            *fields.first_time = time.clone();
        });
    }
    fn set_last_time(&mut self, time: &WindowIndex){
        self.with_mut(|fields|{
            *fields.last_time = time.clone();
        });
    }
    fn slide(&mut self, time: WindowIndex){

        let slide_time = *self.borrow_first_time() + time;

        self.with_mut(|fields| {

            loop{
                if let Some(first) = fields.wait_window.1.pop_front(){
                    if first.key() >= &slide_time{
                        *fields.first_time = *first.key();
                        fields.wait_window.1.insert(first.key().clone(), first.value().clone());
                        break;
                    }
                }else{
                    *fields.first_time = WindowIndex::new(0);
                    *fields.last_time = WindowIndex::new(0);
                    break;
                }
            }
        })
    }
    /*
       当新增一个WindowElement的时候，对之前的wait_window进行检查：
            当wait_window的wait_time超时的时候，对PairFComb进行检查，
                如果满足，则返回一组数据
                如果不满足，则清空wait_window.0，返回None
                同时对窗口进行向后滑动
    */
    async fn wait_window_check(&mut self, index: WindowIndex, wait_time: WindowIndex, slide_time: WindowIndex)
        ->Option<WindowGroup<K,V,T>>{

        let (res,slide) =  self.with_mut(|fields| async {

            if let Some(wait_window) = fields.wait_window.0{
                //println!("wait_window_check index {:?} wait_window.last_time {:?} wait_time {:?}", index.idx>>64, wait_window.idx>>64, wait_time.idx>>64);
                if index > wait_window && index - wait_window > wait_time {
                    //println!("wait_window_check index try check");
                    if let Ok(check) = fields.pair_f.check().await && check {
                        if let Some(range_end) = fields.wait_window.0.take() {
                            let range = fields.wait_window.1.range(WindowIndex::new(0)..=range_end);
                            let mut group = WindowGroup::<K,V,T>::new();
                            for t in range {
                                group.push(t.value().clone());
                            }
                            fields.wait_window.0 = None;
                            return (Some(group),true);
                        } else {
                            return (None,true);
                        }
                    } else {
                        fields.wait_window.0 = None;
                        return (None,true);
                    }
                }else{
                    return (None,false);
                }
            }else{
                return (None, false);
            }
        }).await;

        if slide{
            self.slide(slide_time);
        }

        return res;
    }
    /*
        当新增WindowElement的index超过窗口周期的时候，生成一个wait_window
    */
    fn wait_window_set(&mut self, _index: WindowIndex, window_time: WindowIndex){

        self.with_mut(|fields|{
            if fields.wait_window.0.is_none(){
                let wait_window = fields.first_time.index_slide_window(&window_time);
                //println!("wait_window_set first {} wait_window {}", fields.first_time.idx >> 64, wait_window.idx>>64);
                fields.wait_window.0 = Some(wait_window);
            }
        });
    }
    fn waited(&self)->bool{
        self.borrow_wait_window().0.is_some()
    }
}

#[async_trait]
pub trait PartitionHasher<D>{
    async fn hash_value(&mut self, d: D)->usize;
}

enum PartitionWindow<K,V,T: WindowElement<K,V>,P: PartitionHasher<T::MapValue>,Accumulator: PhysicalPlanExprAccumulatorComb<T::MapValue>>{
    NoPart(WindowCheck<K,V,T,Accumulator>,PhantomData<K>,PhantomData<V>),
    Part(P, HashMap<usize,WindowCheck<K,V,T,Accumulator>>,PhantomData<K>,PhantomData<V>),
}

async fn partition_hash<D,P: PartitionHasher<D>>(pt: &mut P, d: D)->usize{
    pt.hash_value(d).await
}

pub struct Window<K,V,T: WindowElement<K,V>,P: PartitionHasher<T::MapValue>,Accumulator: PhysicalPlanExprAccumulatorComb<T::MapValue>+Clone>{
    name: String,
    pair_f: Accumulator,
    window_time: WindowIndex,
    slide_time: WindowIndex,
    wait_time: WindowIndex,
    time_unit: TimeUnit,
    partition_window: PartitionWindow<K,V,T,P,Accumulator>,
    timeout: Option<Receiver<bool>>,
    phantom: PhantomData<P>,
}

impl <K,V,T: WindowElement<K,V>,P: PartitionHasher<T::MapValue>, Accumulator: PhysicalPlanExprAccumulatorComb<T::MapValue>+Clone> Window<K,V,T,P,Accumulator>{
    pub fn build_new(name: String,
                     window_time: u64, slide_time: u64, wait_time: u64, time_unit: TimeUnit,
                     partition: Option<P>,
                     pair_f: Accumulator, auto_timeout: bool)
                     ->MapResult<Window<K,V,T,P,Accumulator>>{

        if slide_time < 1{
            return Err(MapErr::new(format!("slide_time must >= 1")));
        }

        if wait_time < 1{
            return Err(MapErr::new(format!("wait_time must >= 1")));
        }

        if window_time <= slide_time{
            return Err(MapErr::new(format!("slide_time must < window_time")));
        }
        if window_time <= wait_time{
            return Err(MapErr::new(format!("wait_time must < window_time")));
        }

        let timeout_time = if slide_time <= wait_time{
            slide_time
        }else{
            wait_time
        };

        let partition= if let Some(pt) = partition{
            PartitionWindow::Part(pt,HashMap::new(),PhantomData,PhantomData)
        }else{
            PartitionWindow::NoPart(WindowCheck::build_new(pair_f.clone()), PhantomData, PhantomData)
        };

        let d = match time_unit{
            TimeUnit::Sec => {
                Duration::seconds(timeout_time as i64)
            },
            TimeUnit::Mic => {
                Duration::microseconds(timeout_time as i64)
            },
            TimeUnit::Mil => {
                Duration::milliseconds(timeout_time as i64)
            },
        };

        let timeout = if auto_timeout{
            if let Some(timeout) = timer_reg_repeat(name.clone(), d){
                Some(timeout)
            }else{
                return MapResult::Err(MapErr::new(format!("[{}] timer register failed", name)));
            }
        }else{
            None
        };

        return Ok(Window{
            name: name,
            pair_f: pair_f,
            window_time: WindowIndex::new((window_time as u128) << 64),
            slide_time: WindowIndex::new((slide_time as u128) << 64),
            wait_time: WindowIndex::new((wait_time as u128) << 64),
            time_unit: time_unit,
            partition_window: partition,
            timeout: timeout,
            phantom: PhantomData,
        })
    }
    async fn insert_timeout(&mut self)->Option<TimeOutWindowGroup<K,V,T>> {
        let timeout = match self.time_unit {
            TimeUnit::Sec => { WindowElementJustIndex::new_sec() },
            TimeUnit::Mic => { WindowElementJustIndex::new_micro() },
            TimeUnit::Mil => { WindowElementJustIndex::new_mill() },
        };

        let index = timeout.index();

        match &mut self.partition_window{
            PartitionWindow::NoPart(ref mut wc, _,PhantomData) => {
                if &index > wc.borrow_last_time(){

                    if !wc.waited(){
                        wc.wait_window_set(index.clone(), self.window_time);
                    }

                    let res = wc.wait_window_check(index.clone(), self.wait_time, self.slide_time).await;

                    wc.set_last_time(&index);

                    return match res{
                        Some(t) => {Some(TimeOutWindowGroup::NoPart(t))},
                        None => {None},
                    };
                }else{
                    return None;
                }
            },
            PartitionWindow::Part(_pt, ref mut wc,_,_) => {
                let mut tmp = HashMap::new();

                for (h,ref mut wc) in wc.iter_mut(){
                    if &index > wc.borrow_last_time(){

                        if !wc.waited(){
                            wc.wait_window_set(index.clone(), self.window_time);
                        }

                        let res = wc.wait_window_check(index.clone(), self.wait_time, self.slide_time).await;

                        wc.set_last_time(&index);

                        if let Some(res) = res{
                            tmp.insert(*h, res);
                        }
                    }
                }

                if !tmp.is_empty(){
                    return Some(TimeOutWindowGroup::Part(tmp));
                }else{
                    return None;
                }
            },
        }
    }
    pub async fn insert(&mut self, element: T)->Option<WindowGroup<K,V,T>>{

        let window_time = self.window_time;
        let wait_time = self.wait_time;
        let slide_time = self.slide_time;
        //println!("element idx {:?}", element.index());

        match &mut self.partition_window{
            PartitionWindow::NoPart(ref mut wc,_,_) => {
                //println!("insert NoPart Window");

                let waited = wc.waited();
                return Window::<K,V,T,P,Accumulator>::insert_into_one_window(window_time, wait_time, slide_time,  wc, element, waited).await;
            },
            PartitionWindow::Part(pt , ref mut wc,_,_) => {
                let hash_value = partition_hash(pt, element.index_to_value()).await;

                if let Some(wc) = wc.get_mut(&hash_value){
                    let waited = wc.waited();
                    return Window::<K,V,T,P,Accumulator>::insert_into_one_window(window_time, wait_time, slide_time, wc, element, waited).await;
                }else{
                    let mut wc = WindowCheck::build_new(self.pair_f.clone());
                    let waited = wc.waited();
                    return Window::<K,V,T,P,Accumulator>::insert_into_one_window(window_time, wait_time, slide_time, &mut wc, element, waited).await;
                }
            },
        }
    }
    async fn insert_into_one_window(window_time: WindowIndex, wait_time: WindowIndex, slide_time: WindowIndex,
                         window_check: &mut WindowCheck<K,V,T,Accumulator>,
                         element: T, waited: bool)->Option<WindowGroup<K,V,T>>{

        let mut res = None;

        let index = element.index();

        if window_check.borrow_first_time().idx == 0{
            window_check.set_first_time(&index);
        }else{
            if &index < window_check.borrow_first_time(){
                window_check.set_first_time(&index);
            }
        }

        if window_check.borrow_last_time().idx == 0{
            window_check.set_last_time(&index);
        }else{
            //println!("waited {} {} {}", waited, index.idx, window_check.borrow_last_time().idx);

            if &index > window_check.borrow_last_time(){
                if !waited{
                    window_check.wait_window_set(index.clone(), window_time);
                }

                res = window_check.wait_window_check(index.clone(), wait_time, slide_time).await;

                window_check.set_last_time(&index);
            }
        }

        let _check = window_check.add_and_check(&element).await;
        window_check.borrow_wait_window().1.insert(index,element);

        return res;
    }
}

